Skip to content
工作原理
发送端
- 发送端插入消息(生成序列号,记录事务号),消息状态为操作前
- 业务操作,若成功,改消息状态为已操作,若失败,删除消息
- 通知服务发消息
服务端
- 接收发消息通知
- 配置队列订阅关系
- 发送消息到队列,若成功,改消息状态为已发送,若失败无视
- 接收消息,写入推送表,如果已经写入,则不写入
- 消息ack
- 查询同一个订阅者下,相同key的序号更小的消息是否都已成功推送,如果还有没推送的则不推送,如果都已推送,则推送本条消息,这样可以保证同一个订阅者下相同key的消息保序
- 推送, 两个条件下认为成功:1.返回 http code 为200,且json code 为0,说明接口成功执行,2. 返回http code 为404说明接口不存在,这种情况只应该存在于灰度状态,说明当前灰度不应该推送消息。除这两个条件,其他都认为失败
- 修改推送状态为成功推送或者推送失败
校验补漏部分
- 未操作状态的消息,校验实际业务已操作的,补发,否则删除
- 已操作状态的消息,补发
- 已发送状态的消息,校验推送表是否已完整收到,若没有,补发,若都已收到,标记已接收
- 已接收和推送失败状态的推送,补退
注:队列关系配置逻辑
- 发送消息到队列前服务端会确保队列内的订阅关系配置正确
- exchange 的名字取 topic 名
- 队列名取订阅名.序号,序号是从0开始的数字,订阅的 tag 是 topic.tag.序号
- 发送消息的标签格式是:topic.消息本身带的标签.序号
- 消息标签中的序号由发送端计算得到,假如队列有4个,那么根据key值对4取余得到序号
- 服务端发送消息前会根据队列的堆积情况处理队列:1. 如果当前队列平均堆积超过一个阈值,增加订阅队列和接收消息线程,根据新的队列数计算消息标签中的序号值,2. 如果当前队列中平均堆积小于1,减少一个队列计数,根据新的队列计数计算序号值,3. 假如队列中有队列超过1min没有收到新的消息,删除队列,但会至少保证有一个订阅队列。